{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Copyright 2014 Brett Slatkin, Pearson Education Inc.\n",
    "#\n",
    "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "# you may not use this file except in compliance with the License.\n",
    "# You may obtain a copy of the License at\n",
    "#\n",
    "#     http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License.\n",
    "\n",
    "# Preamble to mimick book environment\n",
    "import logging\n",
    "from pprint import pprint\n",
    "from sys import stdout as STDOUT"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 1\n",
    "def download(item):\n",
    "    return item\n",
    "\n",
    "def resize(item):\n",
    "    return item\n",
    "\n",
    "def upload(item):\n",
    "    return item"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 2\n",
    "from threading import Lock\n",
    "from collections import deque\n",
    "\n",
    "class MyQueue(object):\n",
    "    def __init__(self):\n",
    "        self.items = deque()\n",
    "        self.lock = Lock()\n",
    "\n",
    "\n",
    "# Example 3\n",
    "    def put(self, item):\n",
    "        with self.lock:\n",
    "            self.items.append(item)\n",
    "\n",
    "\n",
    "# Example 4\n",
    "    def get(self):\n",
    "        with self.lock:\n",
    "            return self.items.popleft()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 5\n",
    "from threading import Thread\n",
    "from time import sleep\n",
    "\n",
    "class Worker(Thread):\n",
    "    def __init__(self, func, in_queue, out_queue):\n",
    "        super().__init__()\n",
    "        self.func = func\n",
    "        self.in_queue = in_queue\n",
    "        self.out_queue = out_queue\n",
    "        self.polled_count = 0\n",
    "        self.work_done = 0\n",
    "\n",
    "\n",
    "# Example 6\n",
    "    def run(self):\n",
    "        while True:\n",
    "            self.polled_count += 1\n",
    "            try:\n",
    "                item = self.in_queue.get()\n",
    "            except IndexError:\n",
    "                sleep(0.01)  # No work to do\n",
    "            except AttributeError:\n",
    "                # The magic exit signal\n",
    "                return\n",
    "            else:\n",
    "                result = self.func(item)\n",
    "                self.out_queue.put(result)\n",
    "                self.work_done += 1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 7\n",
    "download_queue = MyQueue()\n",
    "resize_queue = MyQueue()\n",
    "upload_queue = MyQueue()\n",
    "done_queue = MyQueue()\n",
    "threads = [\n",
    "    Worker(download, download_queue, resize_queue),\n",
    "    Worker(resize, resize_queue, upload_queue),\n",
    "    Worker(upload, upload_queue, done_queue),\n",
    "]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 8\n",
    "for thread in threads:\n",
    "    thread.start()\n",
    "for _ in range(1000):\n",
    "    download_queue.put(object())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 9\n",
    "import time\n",
    "while len(done_queue.items) < 1000:\n",
    "    # Do something useful while waiting\n",
    "    time.sleep(0.1)\n",
    "# Stop all the threads by causing an exception in their\n",
    "# run methods.\n",
    "for thread in threads:\n",
    "    thread.in_queue = None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Processed 1000 items after polling 3031 times\n"
     ]
    }
   ],
   "source": [
    "# Example 10\n",
    "processed = len(done_queue.items)\n",
    "polled = sum(t.polled_count for t in threads)\n",
    "print('Processed', processed, 'items after polling',\n",
    "      polled, 'times')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Consumer waiting\n"
     ]
    }
   ],
   "source": [
    "# Example 11\n",
    "from queue import Queue\n",
    "queue = Queue()\n",
    "\n",
    "def consumer():\n",
    "    print('Consumer waiting')\n",
    "    queue.get()                # Runs after put() below\n",
    "    print('Consumer done')\n",
    "\n",
    "thread = Thread(target=consumer)\n",
    "thread.start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Producer putting\n",
      "Consumer done\n",
      "Producer done\n"
     ]
    }
   ],
   "source": [
    "# Example 12\n",
    "print('Producer putting')\n",
    "queue.put(object())            # Runs before get() above\n",
    "thread.join()\n",
    "print('Producer done')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 13\n",
    "queue = Queue(1)               # Buffer size of 1\n",
    "\n",
    "def consumer():\n",
    "    time.sleep(0.1)            # Wait\n",
    "    queue.get()                # Runs second\n",
    "    print('Consumer got 1')\n",
    "    queue.get()                # Runs fourth\n",
    "    print('Consumer got 2')\n",
    "\n",
    "thread = Thread(target=consumer)\n",
    "thread.start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Producer put 1\n",
      "Consumer got 1\n",
      "Producer put 2\n",
      "Consumer got 2\n",
      "Producer done\n"
     ]
    }
   ],
   "source": [
    "# Example 14\n",
    "queue.put(object())            # Runs first\n",
    "print('Producer put 1')\n",
    "queue.put(object())            # Runs third\n",
    "print('Producer put 2')\n",
    "thread.join()\n",
    "print('Producer done')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Consumer waiting\n"
     ]
    }
   ],
   "source": [
    "# Example 15\n",
    "in_queue = Queue()\n",
    "\n",
    "def consumer():\n",
    "    print('Consumer waiting')\n",
    "    work = in_queue.get()      # Done second\n",
    "    print('Consumer working')\n",
    "    # Doing work\n",
    "    print('Consumer done')\n",
    "    in_queue.task_done()       # Done third\n",
    "\n",
    "Thread(target=consumer).start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Producer waiting\n",
      "Consumer working\n",
      "Consumer done\n",
      "Producer done\n"
     ]
    }
   ],
   "source": [
    "# Example 16\n",
    "in_queue.put(object())         # Done first\n",
    "print('Producer waiting')\n",
    "in_queue.join()                # Done fourth\n",
    "print('Producer done')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 17\n",
    "class ClosableQueue(Queue):\n",
    "    SENTINEL = object()\n",
    "\n",
    "    def close(self):\n",
    "        self.put(self.SENTINEL)\n",
    "\n",
    "# Example 18\n",
    "    def __iter__(self):\n",
    "        while True:\n",
    "            item = self.get()\n",
    "            try:\n",
    "                if item is self.SENTINEL:\n",
    "                    return  # Cause the thread to exit\n",
    "                yield item\n",
    "            finally:\n",
    "                self.task_done()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 19\n",
    "class StoppableWorker(Thread):\n",
    "    def __init__(self, func, in_queue, out_queue):\n",
    "        super().__init__()\n",
    "        self.func = func\n",
    "        self.in_queue = in_queue\n",
    "        self.out_queue = out_queue\n",
    "\n",
    "    def run(self):\n",
    "        for item in self.in_queue:\n",
    "            result = self.func(item)\n",
    "            self.out_queue.put(result)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 20\n",
    "download_queue = ClosableQueue()\n",
    "resize_queue = ClosableQueue()\n",
    "upload_queue = ClosableQueue()\n",
    "done_queue = ClosableQueue()\n",
    "threads = [\n",
    "    StoppableWorker(download, download_queue, resize_queue),\n",
    "    StoppableWorker(resize, resize_queue, upload_queue),\n",
    "    StoppableWorker(upload, upload_queue, done_queue),\n",
    "]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 21\n",
    "for thread in threads:\n",
    "    thread.start()\n",
    "for _ in range(1000):\n",
    "    download_queue.put(object())\n",
    "download_queue.close()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1000 items finished\n"
     ]
    }
   ],
   "source": [
    "# Example 22\n",
    "download_queue.join()\n",
    "resize_queue.close()\n",
    "resize_queue.join()\n",
    "upload_queue.close()\n",
    "upload_queue.join()\n",
    "print(done_queue.qsize(), 'items finished')"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
